-
Notifications
You must be signed in to change notification settings - Fork 501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ingest/ledgerbackend: Decouple meta pipe buffering in CaptiveStellarCore backend #3187
Conversation
// close it now because there may be some ledgers in a buffer. | ||
select { | ||
case b.c <- metaResult{nil, err}: | ||
case <-b.runner.getProcessExitChan(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can simplify the shutdown logic further by using contexts and passing them down to each component down to the stellarCoreRunner
instance. Within stellarCoreRunner
we can use https://golang.org/pkg/os/exec/#CommandContext to execute stellar core. Perhaps we can make an issue separate from this PR to investigate if using contexts would help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into using CommandContext
and I think it won't help much. We still need close()
method to do some cleanup and we can't do it with context alone. But we can definitely investigate how to write a shutdown code for more complicated object connections. For example, to figure out how to close CaptiveCore
, bufferedLedgerMetaReader
and stellarCoreRunner
I drew a graph to understand the dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue: #3200.
// usually wait for a specific time duration before checking if the ledger is | ||
// available. When catching up and small buffer this can increase the overall | ||
// time because ledgers are not available. | ||
ledgerReadAheadBufferSize = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the worst case scenario we can have 1000mb of ledgers in the queue. that scenario is unlikely to happen. however, if you're interested, it should be possible to ensure the queued ledgers don't exceed a fixed threshold of memory usage by using a circular buffer:
diff --git a/ingest/ledgerbackend/buffered_meta_pipe_reader.go b/ingest/ledgerbackend/buffered_meta_pipe_reader.go
index 14707c76..0619e16e 100644
--- a/ingest/ledgerbackend/buffered_meta_pipe_reader.go
+++ b/ingest/ledgerbackend/buffered_meta_pipe_reader.go
@@ -2,6 +2,7 @@ package ledgerbackend
import (
"bufio"
+ "fmt"
"io"
"time"
@@ -34,8 +35,59 @@ const (
// available. When catching up and small buffer this can increase the overall
// time because ledgers are not available.
ledgerReadAheadBufferSize = 100
+ maxBytesBuffered = 100 * 1024 * 1024 // 100mb
)
+type stream struct {
+ queue []uint32
+ length int
+ sum uint32
+ idx int
+}
+
+func newStream(capacity int) *stream {
+ return &stream{
+ queue: make([]uint32, capacity),
+ }
+}
+
+func (s *stream) seek(num int) error {
+ if num > s.length {
+ return fmt.Errorf("seek %v exceeds queue length %v", num, s.length)
+ }
+
+ for num > 0 {
+ s.sum -= s.queue[s.idx]
+ s.length--
+ s.idx = (s.idx + 1) % len(s.queue)
+ }
+ return nil
+}
+
+func (s *stream) add(val uint32) {
+ s.sum += val
+ tail := (s.idx + s.length) % len(s.queue)
+ if s.length == len(s.queue) {
+ s.sum -= s.queue[s.idx]
+ s.idx = (s.idx + 1) % len(s.queue)
+ } else {
+ s.length++
+ }
+ s.queue[tail] = val
+}
+
+func (s *stream) tailSum(n int) (uint32, error) {
+ if n > s.length {
+ return 0, fmt.Errorf("tail %v exceeds queue length %v", n, s.length)
+ }
+ if n < s.length {
+ if err := s.seek(s.length - n); err != nil {
+ return 0, err
+ }
+ }
+ return s.sum, nil
+}
+
type metaResult struct {
*xdr.LedgerCloseMeta
err error
@@ -64,6 +116,7 @@ type metaResult struct {
type bufferedLedgerMetaReader struct {
r *bufio.Reader
c chan metaResult
+ queue *stream
runner stellarCoreRunnerInterface
}
@@ -72,6 +125,7 @@ type bufferedLedgerMetaReader struct {
func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) bufferedLedgerMetaReader {
return bufferedLedgerMetaReader{
c: make(chan metaResult, ledgerReadAheadBufferSize),
+ queue: newStream(ledgerReadAheadBufferSize),
r: bufio.NewReaderSize(runner.getMetaPipe(), metaPipeBufferSize),
runner: runner,
}
@@ -93,7 +147,14 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
}
}
- for frameLength > metaPipeBufferSize && len(b.c) > 0 {
+ for {
+ totalBytes, err := b.queue.tailSum(len(b.c))
+ if err != nil {
+ return nil, errors.Wrap(err, "could not obtain buffer size")
+ }
+ if totalBytes == 0 || totalBytes+frameLength <= maxBytesBuffered {
+ break
+ }
// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
select {
case <-b.runner.getProcessExitChan():
@@ -117,6 +178,8 @@ func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMet
return nil, err
}
}
+
+ b.queue.add(frameLength)
return &xlcm, nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense but because this is unlikely I'd vote for simply making the ledgerReadAheadBufferSize
smaller, say, 20 (so the maximum RAM usage is 200 MB). We can open an issue to do it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in ee3226c.
PR Checklist
PR Structure
otherwise).
services/friendbot
, orall
ordoc
if the changes are broad or impact manypackages.
Thoroughness
.md
files, etc... affected by this change). Take a look in the
docs
folder for a given service,like this one.
Release planning
needed with deprecations, added features, breaking changes, and DB schema changes.
semver, or if it's mainly a patch change. The PR is targeted at the next
release branch if it's not a patch change.
What
This commit introduces
bufferedLedgerMetaReader
which decouples buffering and unmarshaling fromstellarCoreRunner
andCaptiveStellarCore
.Why
bufferedLedgerMetaReader
fixes multiple issues:bufferedLedgerMetaReader
allowed rewriting shutdown code to a much simpler version. NowbufferedLedgerMetaReader
andCaptiveStellarCore
listen to a single shutdown signal:stellarCoreRunner.getProcessExitChan()
. When Stellar-Core process terminatesbufferedLedgerMetaReader.Start
go routine will stop andCaptiveStellarCore
will return a user friendly error inPrepareRange
andGetLedger
methods. WhenCaptiveStellarCore.Close()
is called, it kills the Stellar-Core processing triggering shutdown code explained above.stellarCoreRunner
andCaptiveStellarCore
simpler.bufferedLedgerMetaReader
will wait for a buffer to be consumed first before reading more ledgers into memory preventing an increased memory usage.Known limitations
bufferedLedgerMetaReader
. I'll add them after initial approval.